package cn.tedu.charging.order.mqtt;

import cn.tedu.charging.order.common.ChargingConstants;
import cn.tedu.charging.order.common.MqttConstants;
import cn.tedu.charging.order.common.OrderStatusConstant;
import cn.tedu.charging.order.feign.CostClient;
import cn.tedu.charging.order.pojo.dto.ChargingProcessDto;
import cn.tedu.charging.order.pojo.dto.ChargingResultDto;
import cn.tedu.charging.order.pojo.po.ChargingBillFailPO;
import cn.tedu.charging.order.pojo.po.ChargingBillSuccessPO;
import cn.tedu.charging.order.pojo.po.ChargingProcessPO;
import cn.tedu.charing.common.pojo.JsonResult;
import cn.tedu.charing.common.pojo.param.ChargingProcessParam;
import cn.tedu.charing.common.pojo.vo.ChargingProcessVO;
import cn.tedu.charing.common.utils.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.BeanUtils;

/**
 * EMQX消息中间件 消息的消费者
 * 订单服务 用来 订阅 和 接收 设备发送的消息
 */
@Slf4j
public class MqttConsumer implements MqttCallbackExtended {

    private MqttClient mqttClient;

    private MqttContext mqttContext;


    /**
     * 通过写方法 setter 设置 mqttClient
     * @param mqttClient
     */
    public void setMqttClient(MqttClient mqttClient) {
        this.mqttClient = mqttClient;
    }

    /**
     * 通过构造器 设置  MqttClient
     * @param mqttClient
     */
    public MqttConsumer(MqttContext mqttContext,MqttClient mqttClient) {
        this.mqttClient = mqttClient;
        this.mqttContext = mqttContext;
    }

    /**
     * 连接EMQX 成功后,订阅Topic
     * @param reconnect If true, the connection was the result of automatic reconnect.
     * @param serverURI The server URI that the connection was made to.
     */
    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        log.debug("MqttConsumer#connectComplete方法被调用了,表示连接成功了,reconnect:{},serverURI:{}",
                reconnect,serverURI);
        try {
//          开始充电
            log.debug("连接成功,订阅开始充电指令结果 topic:{}",MqttConstants.TOPIC_CHARGING_RESULT);
            mqttClient.subscribe(MqttConstants.TOPIC_CHARGING_RESULT);
            log.debug("连接成功,成功订阅开始充电指令结果 topic:{},",MqttConstants.TOPIC_CHARGING_RESULT);
//          充电进展
            log.debug("连接成功,订阅充电进度 topic:{}",MqttConstants.TOPIC_CHARGING_PROCESS);
            mqttClient.subscribe(MqttConstants.TOPIC_CHARGING_PROCESS);
            log.debug("连接成功,成功订阅充电进度指令结果 topic:{},",MqttConstants.TOPIC_CHARGING_PROCESS);
        } catch (MqttException e) {
            //打印日志 如果异常重试,重试几次后不成功,告警
            throw new RuntimeException(e);
        }
        //mqttClient.setCallback();
    }

    @Override
    public void connectionLost(Throwable cause) {

    }

    /**
     * messageArrived 指的是 设备发送的消息 到达了 订单服务
     *  设备发送的指令  --->EMQX ---> 订单服务
     * @param topic name of the topic on the message was published to
     * @param message the actual message.
     * @throws Exception
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        log.debug("MqttConsumer#messageArrived方法被调用了,表示消息到达,topic:{},message:{}",
                topic,message);
        if(MqttConstants.TOPIC_CHARGING_RESULT.equals(topic)){
            log.debug("处理开始充电的结果消息");
            handleChargingResult(message);
        }else if(MqttConstants.TOPIC_CHARGING_PROCESS.equals(topic)){
            log.debug("处理充电进展消息");
            handleChargingProcess(message);
        }
    }

    private void handleChargingResult(MqttMessage message) {
        ChargingResultDto chargingresultDto = null;
        //把设备发送的消息JSON转换为java对象 chargingresultDto
        try{
            chargingresultDto = JsonUtils.fromJson(message.toString(),ChargingResultDto.class);
            log.debug("消息转换为对象:{}",chargingresultDto);
            if (chargingresultDto != null) {
                //获取开始充电指令设备给的返回响应 开始充电成功/开始充电失败
                String result = chargingresultDto.getResult();
                //判断设备响应结果
                if (ChargingConstants.RESULT_START_CHARGING_SUCCESS.equals(result)) {
                    log.debug("处理开始充电的结果消息");
                    ChargingBillSuccessPO chargingBillSuccessPO = new ChargingBillSuccessPO();
                    chargingBillSuccessPO.setBillId(chargingresultDto.getOrderNo());
                    chargingBillSuccessPO.setBillStatus(OrderStatusConstant.ORDER_STATUS_PROCESS);
                    log.debug("保存成功订单记录:{}",chargingBillSuccessPO);
                    Integer row = mqttContext.getOrderSuccessMapper().insert(chargingBillSuccessPO);
                    log.debug("保存成功订单记录:{},影响行数:{}",chargingBillSuccessPO,row);
                }else {
                    log.debug("设备有响应,设备开始充电失败");
                    ChargingBillFailPO chargingBillFailPO = new ChargingBillFailPO();
                    chargingBillFailPO.setBillId(chargingresultDto.getOrderNo());
                    chargingBillFailPO.setFailDesc("设备有响应,开始充电失败");
                    log.debug("保存失败订单记录:{}",chargingBillFailPO);
                    Integer row = mqttContext.getOrderFailMapper().insert(chargingBillFailPO);
                    log.debug("保存失败订单记录:{},影响行数:{}",chargingBillFailPO,row);
                    log.debug("通知充电的用户,表示歉意,提醒用户更换充电枪");
                    log.debug("通知设备管理员,进行设备的检修");
                }
            }
        }catch (Exception e) {
            log.error("设备发送的开始充电结果消息{},格式有问题,请检查!", message,e);
        }
    }
    private void handleChargingProcess(MqttMessage message) {
        ChargingProcessDto chargingProcessDto = null;
        //把设备发送的消息JSON转换为java对象 chargingProcessDto
        try{
            chargingProcessDto = JsonUtils.fromJson(message.toString(),ChargingProcessDto.class);
            log.debug("消息转换为对象:{}",chargingProcessDto);
            if (chargingProcessDto != null) {

                //严谨的写 在获取到设备同步的数据后,直接保存
                //调用计价服务获取价格,获取价格成功后,更新到同步的数据
                //保存充电同步数据
//                saveChargingProcess();

                //计算电费
                ChargingProcessVO orderCost = getCost(chargingProcessDto);
                if (orderCost != null) {
                    //推送数据给用户 websocket
                    sendChargingProcess2User(orderCost);

                    //如果保存数据的方法写到这
                    //调用计价成功后,才能成功保存充电数据
                    saveChargingProcess(chargingProcessDto,orderCost);

                    //订单处理计价数据 todo
                    //订单服务是否要存储  设备每次同步的充电状态 数据 海量数据存储 ES
                    //实时通知用户(WebSocket) 充电进度,设备状态(温度),
                    // 1 先享后付  不需要实现的判断用户的余额 用户余额是否充足
                    // 2 先付后享  先充值,后消费,实时判断用户的余额,如果用户余额不足,提醒用户充值,停止充电
                    //判断是否充满 设备同步充满了,订单状态该为 已经完成
                }else {
                    log.error("获取价格失败入参:{},打印错误日志,接入告警系统,告警告警告警,",chargingProcessDto);
                }
            }
        }catch (Exception e) {
            log.error("处理设备同步数据:{} 出现异常,请检查!", message,e);
        }
    }
    /**
     * 推送充电进度给用户
     */
    private void sendChargingProcess2User(ChargingProcessVO chargingProcessVO) {
        //把chargingProcessVO 转换为 json
        String json = JsonUtils.toJson(chargingProcessVO);
        log.debug("同步充电数据给用户:{}",chargingProcessVO);
        Boolean success = mqttContext.getWebSocketServer().sendMessage(chargingProcessVO.getUserId(), json);
        log.debug("同步充电数据给用户结果:{},用户id:{},同步数据:{}",success,
                chargingProcessVO.getUserId(),
                chargingProcessVO);
    }

    /**
     * 调用计价服务获取价格信息
     * @param chargingProcessDto
     * @return
     */
    private ChargingProcessVO getCost(ChargingProcessDto chargingProcessDto) {
        CostClient costClient = mqttContext.getCostClient();
        ChargingProcessParam chargingProcessParam = new ChargingProcessParam();
        BeanUtils.copyProperties(chargingProcessDto,chargingProcessParam);
        log.debug("调用计价服务入参:{}",chargingProcessParam);
        JsonResult<ChargingProcessVO> result =
                costClient.calculateCost(chargingProcessParam);
        log.debug("调用计价服务入参:{},返回结果:{}",chargingProcessParam,result);
        if (result != null) {
            return result.getData();
        }
        return null;
    }

    private void saveChargingProcess(ChargingProcessDto chargingProcessDto, ChargingProcessVO chargingProcessVO){
        ChargingProcessPO chargingProcessPO = new ChargingProcessPO();
        //设置id 为 充电数据同步的时间
        chargingProcessPO.setId(System.currentTimeMillis());
        //设置数据
        BeanUtils.copyProperties(chargingProcessDto,chargingProcessPO);
        //设置订单金额
        chargingProcessPO.setTotalCost(chargingProcessVO.getTotalCost());
        log.debug("保存充电进度数据:{}到ES",chargingProcessPO);
        mqttContext.getChargingProcessESRepository().save(chargingProcessPO);
        log.debug("保存充电进度数据:{} 到ES成功",chargingProcessPO);
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {

    }
}
